diff --git a/swh/web/admin/mailmap.py b/swh/web/admin/mailmap.py
deleted file mode 100644
index 4dc80281..00000000
--- a/swh/web/admin/mailmap.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Copyright (C) 2022 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU Affero General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from django.contrib.auth.decorators import permission_required
-from django.shortcuts import render
-
-from swh.web.admin.adminurls import admin_route
-from swh.web.auth.utils import MAILMAP_ADMIN_PERMISSION
-
-
-@admin_route(r"mailmap/", view_name="admin-mailmap")
-@permission_required(MAILMAP_ADMIN_PERMISSION)
-def _admin_mailmap(request):
- return render(request, "admin/mailmap.html")
diff --git a/swh/web/admin/urls.py b/swh/web/admin/urls.py
index 4b06b3b2..dc8243b1 100644
--- a/swh/web/admin/urls.py
+++ b/swh/web/admin/urls.py
@@ -1,29 +1,28 @@
# Copyright (C) 2018-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.contrib.auth.views import LoginView
from django.shortcuts import redirect
from django.urls import re_path as url
from swh.web.admin.adminurls import AdminUrls
import swh.web.admin.deposit # noqa
-import swh.web.admin.mailmap # noqa
import swh.web.admin.origin_save # noqa
from swh.web.config import is_feature_enabled
if is_feature_enabled("add_forge_now"):
import swh.web.admin.add_forge_now # noqa
def _admin_default_view(request):
return redirect("admin-origin-save-requests")
urlpatterns = [
url(r"^$", _admin_default_view, name="admin"),
url(r"^login/$", LoginView.as_view(template_name="login.html"), name="login"),
]
urlpatterns += AdminUrls.get_url_patterns()
diff --git a/swh/web/auth/migrations/0007_mailmap_django_app.py b/swh/web/auth/migrations/0007_mailmap_django_app.py
new file mode 100644
index 00000000..4efe79e8
--- /dev/null
+++ b/swh/web/auth/migrations/0007_mailmap_django_app.py
@@ -0,0 +1,29 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from django.db import migrations
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("swh_web_auth", "0006_fix_mailmap_admin_user_id"),
+ ]
+
+ operations = [
+ # as we simply move the mailmap feature to a dedicated django application,
+ # we do not want to remove the tables in database to not lose data
+ migrations.SeparateDatabaseAndState(
+ state_operations=[
+ migrations.DeleteModel(
+ name="UserMailmap",
+ ),
+ migrations.DeleteModel(
+ name="UserMailmapEvent",
+ ),
+ ],
+ database_operations=[],
+ ),
+ ]
diff --git a/swh/web/auth/models.py b/swh/web/auth/models.py
index 331fe6cc..a3f5a085 100644
--- a/swh/web/auth/models.py
+++ b/swh/web/auth/models.py
@@ -1,123 +1,20 @@
# Copyright (C) 2020-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.db import models
class OIDCUserOfflineTokens(models.Model):
"""
Model storing encrypted bearer tokens generated by users.
"""
user_id = models.CharField(max_length=50)
creation_date = models.DateTimeField(auto_now_add=True)
offline_token = models.BinaryField()
class Meta:
app_label = "swh_web_auth"
db_table = "oidc_user_offline_tokens"
-
-
-class UserMailmapManager(models.Manager):
- """A queryset manager which defers all :class:`models.DateTimeField` fields, to avoid
- resetting them to an old value involuntarily."""
-
- @classmethod
- def deferred_fields(cls):
- try:
- return cls._deferred_fields
- except AttributeError:
- cls._deferred_fields = [
- field.name
- for field in UserMailmap._meta.get_fields()
- if isinstance(field, models.DateTimeField) and not field.auto_now
- ]
- return cls._deferred_fields
-
- def get_queryset(self):
- return super().get_queryset().defer(*self.deferred_fields())
-
-
-class UserMailmap(models.Model):
- """
- Model storing mailmap settings submitted by users.
- """
-
- user_id = models.CharField(max_length=50, null=True)
- """Optional user id from Keycloak"""
-
- from_email = models.TextField(unique=True, null=False)
- """Email address to find author in the archive"""
-
- from_email_verified = models.BooleanField(default=False)
- """Indicates if the from email has been verified"""
-
- from_email_verification_request_date = models.DateTimeField(null=True)
- """Last from email verification request date"""
-
- display_name = models.TextField(null=False)
- """Display name to use for the author instead of the archived one"""
-
- display_name_activated = models.BooleanField(default=False)
- """Indicates if the new display name should be used"""
-
- to_email = models.TextField(null=True)
- """Optional new email to use in the display name instead of the archived one"""
-
- to_email_verified = models.BooleanField(default=False)
- """Indicates if the to email has been verified"""
-
- to_email_verification_request_date = models.DateTimeField(null=True)
- """Last to email verification request date"""
-
- mailmap_last_processing_date = models.DateTimeField(null=True)
- """Last mailmap synchronisation date with swh-storage"""
-
- last_update_date = models.DateTimeField(auto_now=True)
- """Last date that mailmap model was updated"""
-
- class Meta:
- app_label = "swh_web_auth"
- db_table = "user_mailmap"
-
- # Defer _date fields by default to avoid updating them by mistake
- objects = UserMailmapManager()
-
- @property
- def full_display_name(self) -> str:
- if self.to_email is not None and self.to_email_verified:
- return "%s <%s>" % (self.display_name, self.to_email)
- else:
- return self.display_name
-
-
-class UserMailmapEvent(models.Model):
- """
- Represents an update to a mailmap object
- """
-
- timestamp = models.DateTimeField(auto_now=True, null=False)
- """Timestamp of the moment the event was submitted"""
-
- user_id = models.CharField(max_length=50, null=False)
- """User id from Keycloak of the user who changed the mailmap.
- (Not necessarily the one who the mail belongs to.)"""
-
- request_type = models.CharField(max_length=50, null=False)
- """Either ``add`` or ``update``."""
-
- request = models.TextField(null=False)
- """JSON dump of the request received."""
-
- successful = models.BooleanField(default=False, null=False)
- """If False, then the request failed or crashed before completing,
- and may or may not have altered the database's state."""
-
- class Meta:
- indexes = [
- models.Index(fields=["timestamp"]),
- ]
- app_label = "swh_web_auth"
- db_table = "user_mailmap_event"
diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py
index bc2a2461..7d0715e2 100644
--- a/swh/web/auth/views.py
+++ b/swh/web/auth/views.py
@@ -1,197 +1,191 @@
# Copyright (C) 2020-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from typing import Any, Dict, Union, cast
from cryptography.fernet import InvalidToken
from django.contrib.auth.decorators import login_required
from django.core.paginator import Paginator
from django.http import HttpRequest
from django.http.response import (
HttpResponse,
HttpResponseBadRequest,
HttpResponseForbidden,
HttpResponseRedirect,
JsonResponse,
)
from django.shortcuts import render
from django.urls import re_path as url
from django.views.decorators.http import require_http_methods
from swh.auth.django.models import OIDCUser
from swh.auth.django.utils import keycloak_oidc_client
from swh.auth.django.views import get_oidc_login_data, oidc_login_view
from swh.auth.django.views import urlpatterns as auth_urlpatterns
from swh.auth.keycloak import KeycloakError, keycloak_error_message
from swh.web.auth.models import OIDCUserOfflineTokens
from swh.web.auth.utils import decrypt_data, encrypt_data
from swh.web.common.exc import ForbiddenExc
from swh.web.common.utils import reverse
from swh.web.config import get_config
-from .mailmap import urlpatterns as mailmap_urlpatterns
-
def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse:
if not request.user.is_authenticated or not isinstance(request.user, OIDCUser):
return HttpResponseForbidden()
redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request)
return oidc_login_view(
request, redirect_uri=redirect_uri, scope="openid offline_access"
)
def oidc_generate_bearer_token_complete(request: HttpRequest) -> HttpResponse:
if not request.user.is_authenticated or not isinstance(request.user, OIDCUser):
raise ForbiddenExc("You are not allowed to generate bearer tokens.")
if "error" in request.GET:
raise Exception(request.GET["error"])
login_data = get_oidc_login_data(request)
oidc_client = keycloak_oidc_client()
oidc_profile = oidc_client.authorization_code(
code=request.GET["code"],
code_verifier=login_data["code_verifier"],
redirect_uri=login_data["redirect_uri"],
)
user = cast(OIDCUser, request.user)
token = oidc_profile["refresh_token"]
secret = get_config()["secret_key"].encode()
salt = user.sub.encode()
encrypted_token = encrypt_data(token.encode(), secret, salt)
OIDCUserOfflineTokens.objects.create(
user_id=str(user.id), offline_token=encrypted_token
).save()
return HttpResponseRedirect(reverse("oidc-profile") + "#tokens")
def oidc_list_bearer_tokens(request: HttpRequest) -> HttpResponse:
if not request.user.is_authenticated or not isinstance(request.user, OIDCUser):
return HttpResponseForbidden()
tokens = OIDCUserOfflineTokens.objects.filter(user_id=str(request.user.id))
tokens = tokens.order_by("-creation_date")
length = int(request.GET["length"])
page = int(request.GET["start"]) / length + 1
paginator = Paginator(tokens, length)
tokens_data = [
{"id": t.id, "creation_date": t.creation_date.isoformat()}
for t in paginator.page(int(page)).object_list
]
table_data: Dict[str, Any] = {}
table_data["recordsTotal"] = len(tokens_data)
table_data["draw"] = int(request.GET["draw"])
table_data["data"] = tokens_data
table_data["recordsFiltered"] = len(tokens_data)
return JsonResponse(table_data)
def _encrypted_token_bytes(token: Union[bytes, memoryview]) -> bytes:
# token has been retrieved from a PosgreSQL database
if isinstance(token, memoryview):
return token.tobytes()
else:
return token
@require_http_methods(["POST"])
def oidc_get_bearer_token(request: HttpRequest) -> HttpResponse:
if not request.user.is_authenticated or not isinstance(request.user, OIDCUser):
return HttpResponseForbidden()
try:
data = json.loads(request.body.decode("ascii"))
user = cast(OIDCUser, request.user)
token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"])
secret = get_config()["secret_key"].encode()
salt = user.sub.encode()
decrypted_token = decrypt_data(
_encrypted_token_bytes(token_data.offline_token), secret, salt
)
refresh_token = decrypted_token.decode("ascii")
# check token is still valid
oidc_client = keycloak_oidc_client()
oidc_client.refresh_token(refresh_token)
return HttpResponse(refresh_token, content_type="text/plain")
except InvalidToken:
return HttpResponse(status=401)
except KeycloakError as ke:
error_msg = keycloak_error_message(ke)
if error_msg in (
"invalid_grant: Offline session not active",
"invalid_grant: Offline user session not found",
):
error_msg = "Bearer token has expired, please generate a new one."
return HttpResponseBadRequest(error_msg, content_type="text/plain")
@require_http_methods(["POST"])
def oidc_revoke_bearer_tokens(request: HttpRequest) -> HttpResponse:
if not request.user.is_authenticated or not isinstance(request.user, OIDCUser):
return HttpResponseForbidden()
try:
data = json.loads(request.body.decode("ascii"))
user = cast(OIDCUser, request.user)
for token_id in data["token_ids"]:
token_data = OIDCUserOfflineTokens.objects.get(id=token_id)
secret = get_config()["secret_key"].encode()
salt = user.sub.encode()
decrypted_token = decrypt_data(
_encrypted_token_bytes(token_data.offline_token), secret, salt
)
oidc_client = keycloak_oidc_client()
oidc_client.logout(decrypted_token.decode("ascii"))
token_data.delete()
return HttpResponse(status=200)
except InvalidToken:
return HttpResponse(status=401)
@login_required(login_url="/oidc/login/", redirect_field_name="next_path")
def _oidc_profile_view(request: HttpRequest) -> HttpResponse:
return render(request, "auth/profile.html")
-urlpatterns = (
- auth_urlpatterns
- + [
- url(
- r"^oidc/generate-bearer-token/$",
- oidc_generate_bearer_token,
- name="oidc-generate-bearer-token",
- ),
- url(
- r"^oidc/generate-bearer-token-complete/$",
- oidc_generate_bearer_token_complete,
- name="oidc-generate-bearer-token-complete",
- ),
- url(
- r"^oidc/list-bearer-token/$",
- oidc_list_bearer_tokens,
- name="oidc-list-bearer-tokens",
- ),
- url(
- r"^oidc/get-bearer-token/$",
- oidc_get_bearer_token,
- name="oidc-get-bearer-token",
- ),
- url(
- r"^oidc/revoke-bearer-tokens/$",
- oidc_revoke_bearer_tokens,
- name="oidc-revoke-bearer-tokens",
- ),
- url(
- r"^oidc/profile/$",
- _oidc_profile_view,
- name="oidc-profile",
- ),
- ]
- + mailmap_urlpatterns
-)
+urlpatterns = auth_urlpatterns + [
+ url(
+ r"^oidc/generate-bearer-token/$",
+ oidc_generate_bearer_token,
+ name="oidc-generate-bearer-token",
+ ),
+ url(
+ r"^oidc/generate-bearer-token-complete/$",
+ oidc_generate_bearer_token_complete,
+ name="oidc-generate-bearer-token-complete",
+ ),
+ url(
+ r"^oidc/list-bearer-token/$",
+ oidc_list_bearer_tokens,
+ name="oidc-list-bearer-tokens",
+ ),
+ url(
+ r"^oidc/get-bearer-token/$",
+ oidc_get_bearer_token,
+ name="oidc-get-bearer-token",
+ ),
+ url(
+ r"^oidc/revoke-bearer-tokens/$",
+ oidc_revoke_bearer_tokens,
+ name="oidc-revoke-bearer-tokens",
+ ),
+ url(
+ r"^oidc/profile/$",
+ _oidc_profile_view,
+ name="oidc-profile",
+ ),
+]
diff --git a/swh/web/config.py b/swh/web/config.py
index ff580254..dc8423e9 100644
--- a/swh/web/config.py
+++ b/swh/web/config.py
@@ -1,240 +1,241 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from typing import Any, Dict
from swh.core import config
from swh.counters import get_counters
from swh.indexer.storage import get_indexer_storage
from swh.scheduler import get_scheduler
from swh.search import get_search
from swh.storage import get_storage
from swh.vault import get_vault
from swh.web import settings
SWH_WEB_SERVER_NAME = "archive.softwareheritage.org"
SWH_WEB_INTERNAL_SERVER_NAME = "archive.internal.softwareheritage.org"
SWH_WEB_STAGING_SERVER_NAMES = [
"webapp.staging.swh.network",
"webapp.internal.staging.swh.network",
]
SETTINGS_DIR = os.path.dirname(settings.__file__)
DEFAULT_CONFIG = {
"allowed_hosts": ("list", []),
"storage": (
"dict",
{
"cls": "remote",
"url": "http://127.0.0.1:5002/",
"timeout": 10,
},
),
"indexer_storage": (
"dict",
{
"cls": "remote",
"url": "http://127.0.0.1:5007/",
"timeout": 1,
},
),
"counters": (
"dict",
{
"cls": "remote",
"url": "http://127.0.0.1:5011/",
"timeout": 1,
},
),
"search": (
"dict",
{
"cls": "remote",
"url": "http://127.0.0.1:5010/",
"timeout": 10,
},
),
"search_config": (
"dict",
{
"metadata_backend": "swh-indexer-storage",
}, # or "swh-search"
),
"log_dir": ("string", "/tmp/swh/log"),
"debug": ("bool", False),
"serve_assets": ("bool", False),
"host": ("string", "127.0.0.1"),
"port": ("int", 5004),
"secret_key": ("string", "development key"),
# do not display code highlighting for content > 1MB
"content_display_max_size": ("int", 5 * 1024 * 1024),
"snapshot_content_max_size": ("int", 1000),
"throttling": (
"dict",
{
"cache_uri": None, # production: memcached as cache (127.0.0.1:11211)
# development: in-memory cache so None
"scopes": {
"swh_api": {
"limiter_rate": {"default": "120/h"},
"exempted_networks": ["127.0.0.0/8"],
},
"swh_api_origin_search": {
"limiter_rate": {"default": "10/m"},
"exempted_networks": ["127.0.0.0/8"],
},
"swh_vault_cooking": {
"limiter_rate": {"default": "120/h", "GET": "60/m"},
"exempted_networks": ["127.0.0.0/8"],
},
"swh_save_origin": {
"limiter_rate": {"default": "120/h", "POST": "10/h"},
"exempted_networks": ["127.0.0.0/8"],
},
"swh_api_origin_visit_latest": {
"limiter_rate": {"default": "700/m"},
"exempted_networks": ["127.0.0.0/8"],
},
},
},
),
"vault": (
"dict",
{
"cls": "remote",
"args": {
"url": "http://127.0.0.1:5005/",
},
},
),
"scheduler": ("dict", {"cls": "remote", "url": "http://127.0.0.1:5008/"}),
"development_db": ("string", os.path.join(SETTINGS_DIR, "db.sqlite3")),
"test_db": ("dict", {"name": "swh-web-test"}),
"production_db": ("dict", {"name": "swh-web"}),
"deposit": (
"dict",
{
"private_api_url": "https://deposit.softwareheritage.org/1/private/",
"private_api_user": "swhworker",
"private_api_password": "some-password",
},
),
"e2e_tests_mode": ("bool", False),
"es_workers_index_url": ("string", ""),
"history_counters_url": (
"string",
(
"http://counters1.internal.softwareheritage.org:5011"
"/counters_history/history.json"
),
),
"client_config": ("dict", {}),
"keycloak": ("dict", {"server_url": "", "realm_name": ""}),
"graph": (
"dict",
{
"server_url": "http://graph.internal.softwareheritage.org:5009/graph/",
"max_edges": {"staff": 0, "user": 100000, "anonymous": 1000},
},
),
"status": (
"dict",
{
"server_url": "https://status.softwareheritage.org/",
"json_path": "1.0/status/578e5eddcdc0cc7951000520",
},
),
"counters_backend": ("string", "swh-storage"), # or "swh-counters"
"staging_server_names": ("list", SWH_WEB_STAGING_SERVER_NAMES),
"instance_name": ("str", "archive-test.softwareheritage.org"),
"give": ("dict", {"public_key": "", "token": ""}),
"features": ("dict", {"add_forge_now": True}),
"add_forge_now": ("dict", {"email_address": "add-forge-now@example.com"}),
"swh_extra_django_apps": (
"list",
[
"swh.web.inbound_email",
"swh.web.add_forge_now",
+ "swh.web.mailmap",
],
),
}
swhweb_config: Dict[str, Any] = {}
def get_config(config_file="web/web"):
"""Read the configuration file `config_file`.
If an environment variable SWH_CONFIG_FILENAME is defined, this
takes precedence over the config_file parameter.
In any case, update the app with parameters (secret_key, conf)
and return the parsed configuration as a dict.
If no configuration file is provided, return a default
configuration.
"""
if not swhweb_config:
config_filename = os.environ.get("SWH_CONFIG_FILENAME")
if config_filename:
config_file = config_filename
cfg = config.load_named_config(config_file, DEFAULT_CONFIG)
swhweb_config.update(cfg)
config.prepare_folders(swhweb_config, "log_dir")
if swhweb_config.get("search"):
swhweb_config["search"] = get_search(**swhweb_config["search"])
else:
swhweb_config["search"] = None
swhweb_config["storage"] = get_storage(**swhweb_config["storage"])
swhweb_config["vault"] = get_vault(**swhweb_config["vault"])
swhweb_config["indexer_storage"] = get_indexer_storage(
**swhweb_config["indexer_storage"]
)
swhweb_config["scheduler"] = get_scheduler(**swhweb_config["scheduler"])
swhweb_config["counters"] = get_counters(**swhweb_config["counters"])
return swhweb_config
def search():
"""Return the current application's search."""
return get_config()["search"]
def storage():
"""Return the current application's storage."""
return get_config()["storage"]
def vault():
"""Return the current application's vault."""
return get_config()["vault"]
def indexer_storage():
"""Return the current application's indexer storage."""
return get_config()["indexer_storage"]
def scheduler():
"""Return the current application's scheduler."""
return get_config()["scheduler"]
def counters():
"""Return the current application's counters."""
return get_config()["counters"]
def is_feature_enabled(feature_name: str) -> bool:
"""Determine whether a feature is enabled or not. If feature_name is not found at all,
it's considered disabled.
"""
return get_config()["features"].get(feature_name, False)
diff --git a/swh/web/mailmap/__init__.py b/swh/web/mailmap/__init__.py
new file mode 100644
index 00000000..b0221e82
--- /dev/null
+++ b/swh/web/mailmap/__init__.py
@@ -0,0 +1,6 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+default_app_config = "swh.web.mailmap.apps.MailmapConfig"
diff --git a/swh/web/mailmap/apps.py b/swh/web/mailmap/apps.py
new file mode 100644
index 00000000..58f4cb8b
--- /dev/null
+++ b/swh/web/mailmap/apps.py
@@ -0,0 +1,11 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from django.apps import AppConfig
+
+
+class MailmapConfig(AppConfig):
+ name = "swh.web.mailmap"
+ label = "swh_web_mailmap"
diff --git a/swh/web/auth/management/__init__.py b/swh/web/mailmap/management/__init__.py
similarity index 100%
copy from swh/web/auth/management/__init__.py
copy to swh/web/mailmap/management/__init__.py
diff --git a/swh/web/auth/management/commands/__init__.py b/swh/web/mailmap/management/commands/__init__.py
similarity index 100%
rename from swh/web/auth/management/commands/__init__.py
rename to swh/web/mailmap/management/commands/__init__.py
diff --git a/swh/web/auth/management/commands/sync_mailmaps.py b/swh/web/mailmap/management/commands/sync_mailmaps.py
similarity index 98%
rename from swh/web/auth/management/commands/sync_mailmaps.py
rename to swh/web/mailmap/management/commands/sync_mailmaps.py
index 00f109c1..42176644 100644
--- a/swh/web/auth/management/commands/sync_mailmaps.py
+++ b/swh/web/mailmap/management/commands/sync_mailmaps.py
@@ -1,118 +1,118 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import psycopg2
import psycopg2.extensions
from psycopg2.extras import execute_values
from django.core.management.base import BaseCommand
from django.db import transaction
from django.db.models import F
from django.db.models.query import QuerySet
from django.utils import timezone
-from swh.web.auth.models import UserMailmap
+from swh.web.mailmap.models import UserMailmap
DISABLE_MAILMAPS_QUERY = """\
UPDATE person
SET displayname = NULL
FROM (VALUES %s) AS emails (email)
WHERE person.email = emails.email
"""
REFRESH_MAILMAPS_QUERY = """\
UPDATE person
SET displayname = displaynames.displayname
FROM (VALUES %s) AS displaynames (email, displayname)
WHERE
person.email = displaynames.email
AND person.displayname IS DISTINCT FROM displaynames.displayname
"""
class Command(BaseCommand):
help = "Synchronize the mailmaps with swh.storage"
def add_arguments(self, parser):
parser.add_argument("storage_dbconn", type=str)
parser.add_argument(
"--perform",
action="store_true",
help="Perform actions (instead of the default dry-run)",
)
def disable_mailmaps(
self,
storage_db: psycopg2.extensions.connection,
mailmaps: "QuerySet[UserMailmap]",
):
"""Return the SQL to disable a set of mailmaps"""
execute_values(
storage_db.cursor(),
DISABLE_MAILMAPS_QUERY,
((mailmap.from_email.encode("utf-8"),) for mailmap in mailmaps),
)
def refresh_mailmaps(
self,
storage_db: psycopg2.extensions.connection,
mailmaps: "QuerySet[UserMailmap]",
):
execute_values(
storage_db.cursor(),
REFRESH_MAILMAPS_QUERY,
(
(
mailmap.from_email.encode("utf-8"),
mailmap.full_display_name.encode("utf-8"),
)
for mailmap in mailmaps
),
)
def handle(self, *args, **options):
verified_mailmaps = UserMailmap.objects.filter(from_email_verified=True)
# Always refresh display names for person entries with known emails
to_refresh = verified_mailmaps.filter(display_name_activated=True)
# Only remove display_names if they've been deactivated since they've last been
# processed
to_disable = verified_mailmaps.filter(
display_name_activated=False,
mailmap_last_processing_date__lt=F("last_update_date"),
)
process_start = timezone.now()
with transaction.atomic():
self.stdout.write(
"%d mailmaps to disable, %d mailmaps to refresh%s"
% (
to_disable.count(),
to_refresh.count(),
(" (dry run)" if not options["perform"] else ""),
)
)
with psycopg2.connect(options["storage_dbconn"]) as db:
self.disable_mailmaps(db, to_disable.select_for_update())
self.refresh_mailmaps(db, to_refresh.select_for_update())
if not options["perform"]:
db.rollback()
else:
db.commit()
if options["perform"]:
updated = to_disable.update(
mailmap_last_processing_date=process_start
) + to_refresh.update(mailmap_last_processing_date=process_start)
else:
updated = to_disable.count() + to_refresh.count()
self.stdout.write(
self.style.SUCCESS(f"Synced {updated} mailmaps to swh.storage database")
)
diff --git a/swh/web/mailmap/migrations/0001_initial.py b/swh/web/mailmap/migrations/0001_initial.py
new file mode 100644
index 00000000..a876816f
--- /dev/null
+++ b/swh/web/mailmap/migrations/0001_initial.py
@@ -0,0 +1,88 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ initial = True
+
+ dependencies = [] # type: ignore
+
+ operations = [
+ migrations.SeparateDatabaseAndState(
+ # as we move the mailmap feature to a dedicated django application,
+ # no need to recreate database tables as they already exist
+ state_operations=[
+ migrations.CreateModel(
+ name="UserMailmap",
+ fields=[
+ (
+ "id",
+ models.AutoField(
+ auto_created=True,
+ primary_key=True,
+ serialize=False,
+ verbose_name="ID",
+ ),
+ ),
+ ("user_id", models.CharField(max_length=50, null=True)),
+ ("from_email", models.TextField(unique=True)),
+ ("from_email_verified", models.BooleanField(default=False)),
+ (
+ "from_email_verification_request_date",
+ models.DateTimeField(null=True),
+ ),
+ ("display_name", models.TextField()),
+ ("display_name_activated", models.BooleanField(default=False)),
+ ("to_email", models.TextField(null=True)),
+ ("to_email_verified", models.BooleanField(default=False)),
+ (
+ "to_email_verification_request_date",
+ models.DateTimeField(null=True),
+ ),
+ (
+ "mailmap_last_processing_date",
+ models.DateTimeField(null=True),
+ ),
+ ("last_update_date", models.DateTimeField(auto_now=True)),
+ ],
+ options={
+ "db_table": "user_mailmap",
+ },
+ ),
+ migrations.CreateModel(
+ name="UserMailmapEvent",
+ fields=[
+ (
+ "id",
+ models.AutoField(
+ auto_created=True,
+ primary_key=True,
+ serialize=False,
+ verbose_name="ID",
+ ),
+ ),
+ ("timestamp", models.DateTimeField(auto_now=True)),
+ ("user_id", models.CharField(max_length=50)),
+ ("request_type", models.CharField(max_length=50)),
+ ("request", models.TextField()),
+ ("successful", models.BooleanField(default=False)),
+ ],
+ options={
+ "db_table": "user_mailmap_event",
+ },
+ ),
+ migrations.AddIndex(
+ model_name="usermailmapevent",
+ index=models.Index(
+ fields=["timestamp"], name="user_mailma_timesta_1f7aef_idx"
+ ),
+ ),
+ ],
+ database_operations=[],
+ ),
+ ]
diff --git a/swh/web/auth/management/__init__.py b/swh/web/mailmap/migrations/__init__.py
similarity index 100%
copy from swh/web/auth/management/__init__.py
copy to swh/web/mailmap/migrations/__init__.py
diff --git a/swh/web/auth/models.py b/swh/web/mailmap/models.py
similarity index 89%
copy from swh/web/auth/models.py
copy to swh/web/mailmap/models.py
index 331fe6cc..ec779159 100644
--- a/swh/web/auth/models.py
+++ b/swh/web/mailmap/models.py
@@ -1,123 +1,109 @@
# Copyright (C) 2020-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.db import models
-class OIDCUserOfflineTokens(models.Model):
- """
- Model storing encrypted bearer tokens generated by users.
- """
-
- user_id = models.CharField(max_length=50)
- creation_date = models.DateTimeField(auto_now_add=True)
- offline_token = models.BinaryField()
-
- class Meta:
- app_label = "swh_web_auth"
- db_table = "oidc_user_offline_tokens"
-
-
class UserMailmapManager(models.Manager):
"""A queryset manager which defers all :class:`models.DateTimeField` fields, to avoid
resetting them to an old value involuntarily."""
@classmethod
def deferred_fields(cls):
try:
return cls._deferred_fields
except AttributeError:
cls._deferred_fields = [
field.name
for field in UserMailmap._meta.get_fields()
if isinstance(field, models.DateTimeField) and not field.auto_now
]
return cls._deferred_fields
def get_queryset(self):
return super().get_queryset().defer(*self.deferred_fields())
class UserMailmap(models.Model):
"""
Model storing mailmap settings submitted by users.
"""
user_id = models.CharField(max_length=50, null=True)
"""Optional user id from Keycloak"""
from_email = models.TextField(unique=True, null=False)
"""Email address to find author in the archive"""
from_email_verified = models.BooleanField(default=False)
"""Indicates if the from email has been verified"""
from_email_verification_request_date = models.DateTimeField(null=True)
"""Last from email verification request date"""
display_name = models.TextField(null=False)
"""Display name to use for the author instead of the archived one"""
display_name_activated = models.BooleanField(default=False)
"""Indicates if the new display name should be used"""
to_email = models.TextField(null=True)
"""Optional new email to use in the display name instead of the archived one"""
to_email_verified = models.BooleanField(default=False)
"""Indicates if the to email has been verified"""
to_email_verification_request_date = models.DateTimeField(null=True)
"""Last to email verification request date"""
mailmap_last_processing_date = models.DateTimeField(null=True)
"""Last mailmap synchronisation date with swh-storage"""
last_update_date = models.DateTimeField(auto_now=True)
"""Last date that mailmap model was updated"""
class Meta:
- app_label = "swh_web_auth"
+ app_label = "swh_web_mailmap"
db_table = "user_mailmap"
# Defer _date fields by default to avoid updating them by mistake
objects = UserMailmapManager()
@property
def full_display_name(self) -> str:
if self.to_email is not None and self.to_email_verified:
return "%s <%s>" % (self.display_name, self.to_email)
else:
return self.display_name
class UserMailmapEvent(models.Model):
"""
Represents an update to a mailmap object
"""
timestamp = models.DateTimeField(auto_now=True, null=False)
"""Timestamp of the moment the event was submitted"""
user_id = models.CharField(max_length=50, null=False)
"""User id from Keycloak of the user who changed the mailmap.
(Not necessarily the one who the mail belongs to.)"""
request_type = models.CharField(max_length=50, null=False)
"""Either ``add`` or ``update``."""
request = models.TextField(null=False)
"""JSON dump of the request received."""
successful = models.BooleanField(default=False, null=False)
"""If False, then the request failed or crashed before completing,
and may or may not have altered the database's state."""
class Meta:
indexes = [
models.Index(fields=["timestamp"]),
]
- app_label = "swh_web_auth"
+ app_label = "swh_web_mailmap"
db_table = "user_mailmap_event"
diff --git a/swh/web/templates/admin/mailmap.html b/swh/web/mailmap/templates/admin/mailmap.html
similarity index 100%
rename from swh/web/templates/admin/mailmap.html
rename to swh/web/mailmap/templates/admin/mailmap.html
diff --git a/swh/web/mailmap/urls.py b/swh/web/mailmap/urls.py
new file mode 100644
index 00000000..aa0e5cdd
--- /dev/null
+++ b/swh/web/mailmap/urls.py
@@ -0,0 +1,38 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from django.urls import re_path as url
+
+from swh.web.mailmap.views import (
+ admin_mailmap,
+ profile_add_mailmap,
+ profile_list_mailmap,
+ profile_list_mailmap_datatables,
+ profile_update_mailmap,
+)
+
+urlpatterns = [
+ url(
+ r"^profile/mailmap/list/$",
+ profile_list_mailmap,
+ name="profile-mailmap-list",
+ ),
+ url(
+ r"^profile/mailmap/add/$",
+ profile_add_mailmap,
+ name="profile-mailmap-add",
+ ),
+ url(
+ r"^profile/mailmap/update/$",
+ profile_update_mailmap,
+ name="profile-mailmap-update",
+ ),
+ url(
+ r"^profile/mailmap/list/datatables/$",
+ profile_list_mailmap_datatables,
+ name="profile-mailmap-list-datatables",
+ ),
+ url(r"^admin/mailmap/$", admin_mailmap, name="admin-mailmap"),
+]
diff --git a/swh/web/auth/mailmap.py b/swh/web/mailmap/views.py
similarity index 89%
rename from swh/web/auth/mailmap.py
rename to swh/web/mailmap/views.py
index b3f14a3c..1cbc82fb 100644
--- a/swh/web/auth/mailmap.py
+++ b/swh/web/mailmap/views.py
@@ -1,204 +1,186 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from typing import Any, Dict
+from django.contrib.auth.decorators import permission_required
from django.core.paginator import Paginator
from django.db import IntegrityError
from django.db.models import Q
from django.http.request import HttpRequest
from django.http.response import (
HttpResponse,
HttpResponseBadRequest,
HttpResponseNotFound,
JsonResponse,
)
-from django.urls import re_path as url
+from django.shortcuts import render
from rest_framework import serializers
from rest_framework.decorators import api_view
from rest_framework.request import Request
from rest_framework.response import Response
-from swh.web.auth.models import UserMailmap, UserMailmapEvent
from swh.web.auth.utils import (
MAILMAP_ADMIN_PERMISSION,
MAILMAP_PERMISSION,
any_permission_required,
)
+from swh.web.mailmap.models import UserMailmap, UserMailmapEvent
class UserMailmapSerializer(serializers.ModelSerializer):
class Meta:
model = UserMailmap
fields = "__all__"
@api_view(["GET"])
@any_permission_required(MAILMAP_PERMISSION, MAILMAP_ADMIN_PERMISSION)
def profile_list_mailmap(request: Request) -> HttpResponse:
mailmap_admin = request.user.has_perm(MAILMAP_ADMIN_PERMISSION)
mms = UserMailmap.objects.filter(
user_id=None if mailmap_admin else str(request.user.id)
).all()
return Response(UserMailmapSerializer(mms, many=True).data)
@api_view(["POST"])
@any_permission_required(MAILMAP_PERMISSION, MAILMAP_ADMIN_PERMISSION)
def profile_add_mailmap(request: Request) -> HttpResponse:
mailmap_admin = request.user.has_perm(MAILMAP_ADMIN_PERMISSION)
event = UserMailmapEvent.objects.create(
user_id=str(request.user.id),
request_type="add",
request=json.dumps(request.data),
)
from_email = request.data.pop("from_email", None)
if not from_email:
return HttpResponseBadRequest(
"'from_email' must be provided and non-empty.", content_type="text/plain"
)
user_id = None if mailmap_admin else str(request.user.id)
from_email_verified = request.data.pop("from_email_verified", False)
if mailmap_admin:
# consider email verified when mailmap is added by admin
from_email_verified = True
try:
UserMailmap.objects.create(
user_id=user_id,
from_email=from_email,
from_email_verified=from_email_verified,
**request.data,
)
except IntegrityError as e:
if (
"user_mailmap_from_email_key" in e.args[0]
or "user_mailmap.from_email" in e.args[0]
):
return HttpResponseBadRequest(
"This 'from_email' already exists.", content_type="text/plain"
)
else:
raise
event.successful = True
event.save()
mm = UserMailmap.objects.get(user_id=user_id, from_email=from_email)
return Response(UserMailmapSerializer(mm).data)
@api_view(["POST"])
@any_permission_required(MAILMAP_PERMISSION, MAILMAP_ADMIN_PERMISSION)
def profile_update_mailmap(request: Request) -> HttpResponse:
mailmap_admin = request.user.has_perm(MAILMAP_ADMIN_PERMISSION)
event = UserMailmapEvent.objects.create(
user_id=str(request.user.id),
request_type="update",
request=json.dumps(request.data),
)
from_email = request.data.pop("from_email", None)
if not from_email:
return HttpResponseBadRequest(
"'from_email' must be provided and non-empty.", content_type="text/plain"
)
user_id = None if mailmap_admin else str(request.user.id)
try:
to_update = (
UserMailmap.objects.filter(user_id=user_id)
.filter(from_email=from_email)
.get()
)
except UserMailmap.DoesNotExist:
return HttpResponseNotFound("'from_email' cannot be found in mailmaps.")
for attr, value in request.data.items():
setattr(to_update, attr, value)
to_update.save()
event.successful = True
event.save()
mm = UserMailmap.objects.get(user_id=user_id, from_email=from_email)
return Response(UserMailmapSerializer(mm).data)
@any_permission_required(MAILMAP_PERMISSION, MAILMAP_ADMIN_PERMISSION)
def profile_list_mailmap_datatables(request: HttpRequest) -> HttpResponse:
mailmap_admin = request.user.has_perm(MAILMAP_ADMIN_PERMISSION)
mailmaps = UserMailmap.objects.filter(
user_id=None if mailmap_admin else str(request.user.id)
)
search_value = request.GET.get("search[value]", "")
column_order = request.GET.get("order[0][column]")
field_order = request.GET.get(f"columns[{column_order}][name]", "from_email")
order_dir = request.GET.get("order[0][dir]", "asc")
if order_dir == "desc":
field_order = "-" + field_order
mailmaps = mailmaps.order_by(field_order)
table_data: Dict[str, Any] = {}
table_data["draw"] = int(request.GET.get("draw", 1))
table_data["recordsTotal"] = mailmaps.count()
length = int(request.GET.get("length", 10))
page = int(request.GET.get("start", 0)) / length + 1
if search_value:
mailmaps = mailmaps.filter(
Q(from_email__icontains=search_value)
| Q(display_name__icontains=search_value)
)
table_data["recordsFiltered"] = mailmaps.count()
paginator = Paginator(mailmaps, length)
mailmaps_data = [
UserMailmapSerializer(mm).data for mm in paginator.page(int(page)).object_list
]
table_data["data"] = mailmaps_data
return JsonResponse(table_data)
-urlpatterns = [
- url(
- r"^profile/mailmap/list/$",
- profile_list_mailmap,
- name="profile-mailmap-list",
- ),
- url(
- r"^profile/mailmap/add/$",
- profile_add_mailmap,
- name="profile-mailmap-add",
- ),
- url(
- r"^profile/mailmap/update/$",
- profile_update_mailmap,
- name="profile-mailmap-update",
- ),
- url(
- r"^profile/mailmap/list/datatables/$",
- profile_list_mailmap_datatables,
- name="profile-mailmap-list-datatables",
- ),
-]
+@permission_required(MAILMAP_ADMIN_PERMISSION)
+def admin_mailmap(request):
+ return render(request, "admin/mailmap.html")
diff --git a/swh/web/templates/layout.html b/swh/web/templates/layout.html
index 19cfb567..716708f9 100644
--- a/swh/web/templates/layout.html
+++ b/swh/web/templates/layout.html
@@ -1,313 +1,313 @@
{% comment %}
Copyright (C) 2015-2022 The Software Heritage developers
See the AUTHORS file at the top-level directory of this distribution
License: GNU Affero General Public License version 3, or any later version
See top-level LICENSE file for more information
{% endcomment %}
{% load js_reverse %}
{% load static %}
{% load render_bundle from webpack_loader %}
{% load swh_templatetags %}
Operational
{% url 'logout' as logout_url %}
{% if user.is_authenticated %}
Logged in as
{% if 'OIDC' in user.backend %}
{{ user.username }},
logout
{% else %}
{{ user.username }},
logout
{% endif %}
{% elif oidc_enabled %}
{% if request.path != logout_url %}
login
{% else %}
login
{% endif %}
{% else %}
{% if request.path != logout_url %}
login
{% else %}
login
{% endif %}
{% endif %}
{% include "misc/hiring-banner.html" %}
{% if swh_web_staging %}
Staging v{{ swh_web_version }}
{% elif swh_web_dev %}
Development v{{ swh_web_version|split:"+"|first }}
{% endif %}
{% block content %}{% endblock %}
{% include "includes/global-modals.html" %}
diff --git a/swh/web/tests/auth/test_migrations.py b/swh/web/tests/auth/test_migrations.py
index 331539d5..18e75c76 100644
--- a/swh/web/tests/auth/test_migrations.py
+++ b/swh/web/tests/auth/test_migrations.py
@@ -1,38 +1,54 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime
+import pytest
+
APP_NAME = "swh_web_auth"
MIGRATION_0005 = "0005_usermailmapevent"
MIGRATION_0006 = "0006_fix_mailmap_admin_user_id"
+MIGRATION_0007 = "0007_mailmap_django_app"
def test_fix_mailmap_admin_user_id(migrator):
state = migrator.apply_tested_migration((APP_NAME, MIGRATION_0005))
UserMailmap = state.apps.get_model(APP_NAME, "UserMailmap")
user_id = "45"
UserMailmap.objects.create(
user_id=user_id,
from_email="user@example.org",
from_email_verified=True,
display_name="New display name",
)
UserMailmap.objects.filter(user_id=user_id).update(
last_update_date=datetime(2022, 2, 11, 14, 16, 13, 614000)
)
assert UserMailmap.objects.filter(user_id=user_id).count() == 1
assert UserMailmap.objects.filter(user_id=None).count() == 0
state = migrator.apply_tested_migration((APP_NAME, MIGRATION_0006))
UserMailmap = state.apps.get_model(APP_NAME, "UserMailmap")
assert UserMailmap.objects.filter(user_id=user_id).count() == 0
assert UserMailmap.objects.filter(user_id=None).count() == 1
+
+
+def test_mailmap_django_app(migrator):
+ state = migrator.apply_tested_migration((APP_NAME, MIGRATION_0006))
+ UserMailmap = state.apps.get_model(APP_NAME, "UserMailmap")
+ assert UserMailmap
+
+ # UserMailmap model moved to swh_web_mailmap django application
+ state = migrator.apply_tested_migration((APP_NAME, MIGRATION_0007))
+ with pytest.raises(
+ LookupError, match="App 'swh_web_auth' doesn't have a 'UserMailmap' model."
+ ):
+ state.apps.get_model(APP_NAME, "UserMailmap")
diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py
index 6d0e93c9..ca320b27 100644
--- a/swh/web/tests/conftest.py
+++ b/swh/web/tests/conftest.py
@@ -1,1239 +1,1240 @@
# Copyright (C) 2018-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from datetime import timedelta
import functools
from importlib import import_module, reload
import json
import os
import random
import shutil
import sys
import time
from typing import Any, Dict, List, Optional
from _pytest.python import Function
-from hypothesis import HealthCheck, settings
+from hypothesis import HealthCheck
+from hypothesis import settings as hypothesis_settings
import pytest
from pytest_django.fixtures import SettingsWrapper
from django.contrib.auth.models import User
from django.core.cache import cache
from django.test.utils import setup_databases
from django.urls import clear_url_caches
from rest_framework.test import APIClient, APIRequestFactory
from swh.model.hashutil import (
ALGORITHMS,
DEFAULT_ALGORITHMS,
hash_to_bytes,
hash_to_hex,
)
from swh.model.model import Content, Directory
from swh.model.swhids import CoreSWHID, ObjectType
from swh.scheduler.tests.common import TASK_TYPES
from swh.storage.algos.origin import origin_get_latest_visit_status
from swh.storage.algos.revisions_walker import get_revisions_walker
from swh.storage.algos.snapshot import snapshot_get_all_branches, snapshot_get_latest
from swh.web.auth.utils import (
ADD_FORGE_MODERATOR_PERMISSION,
MAILMAP_ADMIN_PERMISSION,
MAILMAP_PERMISSION,
OIDC_SWH_WEB_CLIENT_ID,
)
from swh.web.common import converters
from swh.web.common.origin_save import get_scheduler_load_task_types
from swh.web.common.typing import OriginVisitInfo
from swh.web.common.utils import browsers_supported_image_mimes
from swh.web.config import get_config
from swh.web.tests.data import (
get_tests_data,
override_storages,
random_content,
random_sha1,
random_sha1_bytes,
random_sha256,
)
from swh.web.tests.utils import create_django_permission
os.environ["LC_ALL"] = "C.UTF-8"
fossology_missing = shutil.which("nomossa") is None
# Register some hypothesis profiles
-settings.register_profile("default", settings())
+hypothesis_settings.register_profile("default", hypothesis_settings())
# we use getattr here to keep mypy happy regardless hypothesis version
function_scoped_fixture_check = (
[getattr(HealthCheck, "function_scoped_fixture")]
if hasattr(HealthCheck, "function_scoped_fixture")
else []
)
suppress_health_check = [
HealthCheck.too_slow,
HealthCheck.filter_too_much,
] + function_scoped_fixture_check
-settings.register_profile(
+hypothesis_settings.register_profile(
"swh-web",
- settings(
+ hypothesis_settings(
deadline=None,
suppress_health_check=suppress_health_check,
),
)
-settings.register_profile(
+hypothesis_settings.register_profile(
"swh-web-fast",
- settings(
+ hypothesis_settings(
deadline=None,
max_examples=5,
suppress_health_check=suppress_health_check,
),
)
def pytest_addoption(parser):
parser.addoption("--swh-web-random-seed", action="store", default=None)
def pytest_configure(config):
# Use fast hypothesis profile by default if none has been
# explicitly specified in pytest option
if config.getoption("--hypothesis-profile") is None:
- settings.load_profile("swh-web-fast")
+ hypothesis_settings.load_profile("swh-web-fast")
# Small hack in order to be able to run the unit tests
# without static assets generated by webpack.
# Those assets are not really needed for the Python tests
# but the django templates will fail to load due to missing
# generated file webpack-stats.json describing the js and css
# files to include.
# So generate a dummy webpack-stats.json file to overcome
# that issue.
test_dir = os.path.dirname(__file__)
# location of the static folder when running tests through tox
data_dir = os.path.join(sys.prefix, "share/swh/web")
static_dir = os.path.join(data_dir, "static")
if not os.path.exists(static_dir):
# location of the static folder when running tests locally with pytest
static_dir = os.path.join(test_dir, "../../../static")
webpack_stats = os.path.join(static_dir, "webpack-stats.json")
if os.path.exists(webpack_stats):
return
bundles_dir = os.path.join(test_dir, "../../../assets/src/bundles")
if not os.path.exists(bundles_dir):
# location of the bundles folder when running tests with tox
bundles_dir = os.path.join(data_dir, "assets/src/bundles")
_, bundles, _ = next(os.walk(bundles_dir))
mock_webpack_stats = {
"status": "done",
"publicPath": "/static",
"chunks": {},
"assets": {},
}
for bundle in bundles:
asset = f"js/{bundle}.js"
mock_webpack_stats["chunks"][bundle] = [asset]
mock_webpack_stats["assets"][asset] = {
"name": asset,
"publicPath": f"/static/{asset}",
}
with open(webpack_stats, "w") as outfile:
json.dump(mock_webpack_stats, outfile)
_swh_web_custom_section = "swh-web custom section"
_random_seed_cache_key = "swh-web/random-seed"
@pytest.fixture(scope="function", autouse=True)
def random_seed(pytestconfig):
state = random.getstate()
seed = pytestconfig.getoption("--swh-web-random-seed")
if seed is None:
seed = time.time()
seed = int(seed)
cache.set(_random_seed_cache_key, seed)
random.seed(seed)
yield seed
random.setstate(state)
def pytest_report_teststatus(report, *args):
if report.when == "call" and report.outcome == "failed":
seed = cache.get(_random_seed_cache_key, None)
line = (
f'FAILED {report.nodeid}: Use "pytest --swh-web-random-seed={seed} '
f'{report.nodeid}" to reproduce that test failure with same inputs'
)
report.sections.append((_swh_web_custom_section, line))
def pytest_terminal_summary(terminalreporter, *args):
reports = terminalreporter.getreports("failed")
content = os.linesep.join(
text
for report in reports
for secname, text in report.sections
if secname == _swh_web_custom_section
)
if content:
terminalreporter.ensure_newline()
terminalreporter.section(_swh_web_custom_section, sep="-", blue=True, bold=True)
terminalreporter.line(content)
# Clear Django cache before each test
@pytest.fixture(autouse=True)
def django_cache_cleared():
cache.clear()
# Alias rf fixture from pytest-django
@pytest.fixture
def request_factory(rf):
return rf
# Fixture to get test client from Django REST Framework
@pytest.fixture
def api_client():
return APIClient()
# Fixture to get API request factory from Django REST Framework
@pytest.fixture
def api_request_factory():
return APIRequestFactory()
# Initialize tests data
@pytest.fixture(scope="function", autouse=True)
def tests_data():
data = get_tests_data(reset=True)
# Update swh-web configuration to use the in-memory storages
# instantiated in the tests.data module
override_storages(
data["storage"], data["idx_storage"], data["search"], data["counters"]
)
return data
@pytest.fixture(scope="function")
def sha1():
"""Fixture returning a valid hexadecimal sha1 value."""
return random_sha1()
@pytest.fixture(scope="function")
def invalid_sha1():
"""Fixture returning an invalid sha1 representation."""
return hash_to_hex(bytes(random.randint(0, 255) for _ in range(50)))
@pytest.fixture(scope="function")
def sha256():
"""Fixture returning a valid hexadecimal sha256 value."""
return random_sha256()
def _known_swh_objects(tests_data, object_type):
return tests_data[object_type]
@pytest.fixture(scope="function")
def content(tests_data):
"""Fixture returning a random content ingested into the test archive."""
return random.choice(_known_swh_objects(tests_data, "contents"))
@pytest.fixture(scope="function")
def contents(tests_data):
"""Fixture returning random contents ingested into the test archive."""
return random.choices(
_known_swh_objects(tests_data, "contents"), k=random.randint(2, 8)
)
def _new_content(tests_data):
while True:
new_content = random_content()
sha1_bytes = hash_to_bytes(new_content["sha1"])
if tests_data["storage"].content_get_data(sha1_bytes) is None:
return new_content
@pytest.fixture(scope="function")
def unknown_content(tests_data):
"""Fixture returning a random content not ingested into the test archive."""
return _new_content(tests_data)
@pytest.fixture(scope="function")
def unknown_contents(tests_data):
"""Fixture returning random contents not ingested into the test archive."""
new_contents = []
new_content_ids = set()
nb_contents = random.randint(2, 8)
while len(new_contents) != nb_contents:
new_content = _new_content(tests_data)
if new_content["sha1"] not in new_content_ids:
new_contents.append(new_content)
new_content_ids.add(new_content["sha1"])
return list(new_contents)
@pytest.fixture(scope="function")
def empty_content():
"""Fixture returning the empty content ingested into the test archive."""
empty_content = Content.from_data(data=b"").to_dict()
for algo in DEFAULT_ALGORITHMS:
empty_content[algo] = hash_to_hex(empty_content[algo])
return empty_content
@functools.lru_cache(maxsize=None)
def _content_text():
return list(
filter(
lambda c: c["mimetype"].startswith("text/"),
_known_swh_objects(get_tests_data(), "contents"),
)
)
@pytest.fixture(scope="function")
def content_text():
"""
Fixture returning a random textual content ingested into the test archive.
"""
return random.choice(_content_text())
@functools.lru_cache(maxsize=None)
def _content_text_non_utf8():
return list(
filter(
lambda c: c["mimetype"].startswith("text/")
and c["encoding"] not in ("utf-8", "us-ascii"),
_known_swh_objects(get_tests_data(), "contents"),
)
)
@pytest.fixture(scope="function")
def content_text_non_utf8():
"""Fixture returning a random textual content not encoded to UTF-8 ingested
into the test archive.
"""
return random.choice(_content_text_non_utf8())
@functools.lru_cache(maxsize=None)
def _content_application_no_highlight():
return list(
filter(
lambda c: c["mimetype"].startswith("application/")
and c["hljs_language"] == "plaintext",
_known_swh_objects(get_tests_data(), "contents"),
)
)
@pytest.fixture(scope="function")
def content_application_no_highlight():
"""Fixture returning a random textual content with mimetype
starting with application/ and no detected programming language to
highlight ingested into the test archive.
"""
return random.choice(_content_application_no_highlight())
@functools.lru_cache(maxsize=None)
def _content_text_no_highlight():
return list(
filter(
lambda c: c["mimetype"].startswith("text/")
and c["hljs_language"] == "plaintext",
_known_swh_objects(get_tests_data(), "contents"),
)
)
@pytest.fixture(scope="function")
def content_text_no_highlight():
"""Fixture returning a random textual content with no detected
programming language to highlight ingested into the test archive.
"""
return random.choice(_content_text_no_highlight())
@functools.lru_cache(maxsize=None)
def _content_image_type():
return list(
filter(
lambda c: c["mimetype"] in browsers_supported_image_mimes,
_known_swh_objects(get_tests_data(), "contents"),
)
)
@pytest.fixture(scope="function")
def content_image_type():
"""Fixture returning a random image content ingested into the test archive."""
return random.choice(_content_image_type())
@functools.lru_cache(maxsize=None)
def _content_unsupported_image_type_rendering():
return list(
filter(
lambda c: c["mimetype"].startswith("image/")
and c["mimetype"] not in browsers_supported_image_mimes,
_known_swh_objects(get_tests_data(), "contents"),
)
)
@pytest.fixture(scope="function")
def content_unsupported_image_type_rendering():
"""Fixture returning a random image content ingested into the test archive that
can not be rendered by browsers.
"""
return random.choice(_content_unsupported_image_type_rendering())
@functools.lru_cache(maxsize=None)
def _content_utf8_detected_as_binary():
def utf8_binary_detected(content):
if content["encoding"] != "binary":
return False
try:
content["raw_data"].decode("utf-8")
except Exception:
return False
else:
return True
return list(
filter(utf8_binary_detected, _known_swh_objects(get_tests_data(), "contents"))
)
@pytest.fixture(scope="function")
def content_utf8_detected_as_binary():
"""Fixture returning a random textual content detected as binary
by libmagic while they are valid UTF-8 encoded files.
"""
return random.choice(_content_utf8_detected_as_binary())
@pytest.fixture(scope="function")
def directory(tests_data):
"""Fixture returning a random directory ingested into the test archive."""
return random.choice(_known_swh_objects(tests_data, "directories"))
@functools.lru_cache(maxsize=None)
def _directory_with_entry_type(type_):
tests_data = get_tests_data()
return list(
filter(
lambda d: any(
[
e["type"] == type_
for e in list(tests_data["storage"].directory_ls(hash_to_bytes(d)))
]
),
_known_swh_objects(tests_data, "directories"),
)
)
@pytest.fixture(scope="function")
def directory_with_subdirs():
"""Fixture returning a random directory containing sub directories ingested
into the test archive.
"""
return random.choice(_directory_with_entry_type("dir"))
@pytest.fixture(scope="function")
def directory_with_files():
"""Fixture returning a random directory containing at least one regular file."""
return random.choice(_directory_with_entry_type("file"))
@pytest.fixture(scope="function")
def unknown_directory(tests_data):
"""Fixture returning a random directory not ingested into the test archive."""
while True:
new_directory = random_sha1()
sha1_bytes = hash_to_bytes(new_directory)
if list(tests_data["storage"].directory_missing([sha1_bytes])):
return new_directory
@pytest.fixture(scope="function")
def empty_directory():
"""Fixture returning the empty directory ingested into the test archive."""
return Directory(entries=()).id.hex()
@pytest.fixture(scope="function")
def revision(tests_data):
"""Fixturereturning a random revision ingested into the test archive."""
return random.choice(_known_swh_objects(tests_data, "revisions"))
@pytest.fixture(scope="function")
def revisions(tests_data):
"""Fixture returning random revisions ingested into the test archive."""
return random.choices(
_known_swh_objects(tests_data, "revisions"),
k=random.randint(2, 8),
)
@pytest.fixture(scope="function")
def revisions_list(tests_data):
"""Fixture returning random revisions ingested into the test archive."""
def gen_revisions_list(size):
return random.choices(
_known_swh_objects(tests_data, "revisions"),
k=size,
)
return gen_revisions_list
@pytest.fixture(scope="function")
def unknown_revision(tests_data):
"""Fixture returning a random revision not ingested into the test archive."""
while True:
new_revision = random_sha1()
sha1_bytes = hash_to_bytes(new_revision)
if tests_data["storage"].revision_get([sha1_bytes])[0] is None:
return new_revision
def _get_origin_dfs_revisions_walker(tests_data):
storage = tests_data["storage"]
origin = random.choice(tests_data["origins"][:-1])
snapshot = snapshot_get_latest(storage, origin["url"])
if snapshot.branches[b"HEAD"].target_type.value == "alias":
target = snapshot.branches[b"HEAD"].target
head = snapshot.branches[target].target
else:
head = snapshot.branches[b"HEAD"].target
return get_revisions_walker("dfs", storage, head)
@functools.lru_cache(maxsize=None)
def _ancestor_revisions_data():
# get a dfs revisions walker for one of the origins
# loaded into the test archive
revisions_walker = _get_origin_dfs_revisions_walker(get_tests_data())
master_revisions = []
children = defaultdict(list)
init_rev_found = False
# get revisions only authored in the master branch
for rev in revisions_walker:
for rev_p in rev["parents"]:
children[rev_p].append(rev["id"])
if not init_rev_found:
master_revisions.append(rev)
if not rev["parents"]:
init_rev_found = True
return master_revisions, children
@pytest.fixture(scope="function")
def ancestor_revisions():
"""Fixture returning a pair of revisions ingested into the test archive
with an ancestor relation.
"""
master_revisions, children = _ancestor_revisions_data()
# head revision
root_rev = master_revisions[0]
# pick a random revision, different from head, only authored
# in the master branch
ancestor_rev_idx = random.choice(list(range(1, len(master_revisions) - 1)))
ancestor_rev = master_revisions[ancestor_rev_idx]
ancestor_child_revs = children[ancestor_rev["id"]]
return {
"sha1_git_root": hash_to_hex(root_rev["id"]),
"sha1_git": hash_to_hex(ancestor_rev["id"]),
"children": [hash_to_hex(r) for r in ancestor_child_revs],
}
@functools.lru_cache(maxsize=None)
def _non_ancestor_revisions_data():
# get a dfs revisions walker for one of the origins
# loaded into the test archive
revisions_walker = _get_origin_dfs_revisions_walker(get_tests_data())
merge_revs = []
children = defaultdict(list)
# get all merge revisions
for rev in revisions_walker:
if len(rev["parents"]) > 1:
merge_revs.append(rev)
for rev_p in rev["parents"]:
children[rev_p].append(rev["id"])
return merge_revs, children
@pytest.fixture(scope="function")
def non_ancestor_revisions():
"""Fixture returning a pair of revisions ingested into the test archive
with no ancestor relation.
"""
merge_revs, children = _non_ancestor_revisions_data()
# find a merge revisions whose parents have a unique child revision
random.shuffle(merge_revs)
selected_revs = None
for merge_rev in merge_revs:
if all(len(children[rev_p]) == 1 for rev_p in merge_rev["parents"]):
selected_revs = merge_rev["parents"]
return {
"sha1_git_root": hash_to_hex(selected_revs[0]),
"sha1_git": hash_to_hex(selected_revs[1]),
}
@pytest.fixture(scope="function")
def revision_with_submodules():
"""Fixture returning a revision that is known to
point to a directory with revision entries (aka git submodules)
"""
return {
"rev_sha1_git": "ffcb69001f3f6745dfd5b48f72ab6addb560e234",
"rev_dir_sha1_git": "d92a21446387fa28410e5a74379c934298f39ae2",
"rev_dir_rev_path": "libtess2",
}
@pytest.fixture(scope="function")
def release(tests_data):
"""Fixture returning a random release ingested into the test archive."""
return random.choice(_known_swh_objects(tests_data, "releases"))
@pytest.fixture(scope="function")
def releases(tests_data):
"""Fixture returning random releases ingested into the test archive."""
return random.choices(
_known_swh_objects(tests_data, "releases"), k=random.randint(2, 8)
)
@pytest.fixture(scope="function")
def unknown_release(tests_data):
"""Fixture returning a random release not ingested into the test archive."""
while True:
new_release = random_sha1()
sha1_bytes = hash_to_bytes(new_release)
if tests_data["storage"].release_get([sha1_bytes])[0] is None:
return new_release
@pytest.fixture(scope="function")
def snapshot(tests_data):
"""Fixture returning a random snapshot ingested into the test archive."""
return random.choice(_known_swh_objects(tests_data, "snapshots"))
@pytest.fixture(scope="function")
def unknown_snapshot(tests_data):
"""Fixture returning a random snapshot not ingested into the test archive."""
while True:
new_snapshot = random_sha1()
sha1_bytes = hash_to_bytes(new_snapshot)
if tests_data["storage"].snapshot_get_branches(sha1_bytes) is None:
return new_snapshot
@pytest.fixture(scope="function")
def origin(tests_data):
"""Fixture returning a random origin ingested into the test archive."""
return random.choice(_known_swh_objects(tests_data, "origins"))
@functools.lru_cache(maxsize=None)
def _origin_with_multiple_visits():
tests_data = get_tests_data()
origins = []
storage = tests_data["storage"]
for origin in tests_data["origins"]:
visit_page = storage.origin_visit_get(origin["url"])
if len(visit_page.results) > 1:
origins.append(origin)
return origins
@pytest.fixture(scope="function")
def origin_with_multiple_visits():
"""Fixture returning a random origin with multiple visits ingested
into the test archive.
"""
return random.choice(_origin_with_multiple_visits())
@functools.lru_cache(maxsize=None)
def _origin_with_releases():
tests_data = get_tests_data()
origins = []
for origin in tests_data["origins"]:
snapshot = snapshot_get_latest(tests_data["storage"], origin["url"])
if any([b.target_type.value == "release" for b in snapshot.branches.values()]):
origins.append(origin)
return origins
@pytest.fixture(scope="function")
def origin_with_releases():
"""Fixture returning a random origin with releases ingested into the test archive."""
return random.choice(_origin_with_releases())
@functools.lru_cache(maxsize=None)
def _origin_with_pull_request_branches():
tests_data = get_tests_data()
origins = []
storage = tests_data["storage"]
for origin in storage.origin_list(limit=1000).results:
snapshot = snapshot_get_latest(storage, origin.url)
if any([b"refs/pull/" in b for b in snapshot.branches]):
origins.append(origin)
return origins
@pytest.fixture(scope="function")
def origin_with_pull_request_branches():
"""Fixture returning a random origin with pull request branches ingested
into the test archive.
"""
return random.choice(_origin_with_pull_request_branches())
@functools.lru_cache(maxsize=None)
def _object_type_swhid(object_type):
return list(
filter(
lambda swhid: swhid.object_type == object_type,
_known_swh_objects(get_tests_data(), "swhids"),
)
)
@pytest.fixture(scope="function")
def content_swhid():
"""Fixture returning a qualified SWHID for a random content object
ingested into the test archive.
"""
return random.choice(_object_type_swhid(ObjectType.CONTENT))
@pytest.fixture(scope="function")
def directory_swhid():
"""Fixture returning a qualified SWHID for a random directory object
ingested into the test archive.
"""
return random.choice(_object_type_swhid(ObjectType.DIRECTORY))
@pytest.fixture(scope="function")
def release_swhid():
"""Fixture returning a qualified SWHID for a random release object
ingested into the test archive.
"""
return random.choice(_object_type_swhid(ObjectType.RELEASE))
@pytest.fixture(scope="function")
def revision_swhid():
"""Fixture returning a qualified SWHID for a random revision object
ingested into the test archive.
"""
return random.choice(_object_type_swhid(ObjectType.REVISION))
@pytest.fixture(scope="function")
def snapshot_swhid():
"""Fixture returning a qualified SWHID for a snapshot object
ingested into the test archive.
"""
return random.choice(_object_type_swhid(ObjectType.SNAPSHOT))
@pytest.fixture(scope="function", params=list(ObjectType))
def unknown_core_swhid(request) -> CoreSWHID:
"""Fixture returning an unknown core SWHID.
Tests using this will be called once per object type.
"""
return CoreSWHID(
object_type=request.param,
object_id=random_sha1_bytes(),
)
# Fixture to manipulate data from a sample archive used in the tests
@pytest.fixture(scope="function")
def archive_data(tests_data):
return _ArchiveData(tests_data)
# Fixture to manipulate indexer data from a sample archive used in the tests
@pytest.fixture(scope="function")
def indexer_data(tests_data):
return _IndexerData(tests_data)
# Custom data directory for requests_mock
@pytest.fixture
def datadir():
return os.path.join(os.path.abspath(os.path.dirname(__file__)), "resources")
class _ArchiveData:
"""
Helper class to manage data from a sample test archive.
It is initialized with a reference to an in-memory storage
containing raw tests data.
It is basically a proxy to Storage interface but it overrides some methods
to retrieve those tests data in a json serializable format in order to ease
tests implementation.
"""
def __init__(self, tests_data):
self.storage = tests_data["storage"]
def __getattr__(self, key):
if key == "storage":
raise AttributeError(key)
# Forward calls to non overridden Storage methods to wrapped
# storage instance
return getattr(self.storage, key)
def content_find(self, content: Dict[str, Any]) -> Dict[str, Any]:
cnt_ids_bytes = {
algo_hash: hash_to_bytes(content[algo_hash])
for algo_hash in ALGORITHMS
if content.get(algo_hash)
}
cnt = self.storage.content_find(cnt_ids_bytes)
return converters.from_content(cnt[0].to_dict()) if cnt else cnt
def content_get(self, cnt_id: str) -> Dict[str, Any]:
cnt_id_bytes = hash_to_bytes(cnt_id)
content = self.storage.content_get([cnt_id_bytes])[0]
if content:
content_d = content.to_dict()
content_d.pop("ctime", None)
else:
content_d = None
return converters.from_swh(
content_d, hashess={"sha1", "sha1_git", "sha256", "blake2s256"}
)
def content_get_data(self, cnt_id: str) -> Optional[Dict[str, Any]]:
cnt_id_bytes = hash_to_bytes(cnt_id)
cnt_data = self.storage.content_get_data(cnt_id_bytes)
if cnt_data is None:
return None
return converters.from_content({"data": cnt_data, "sha1": cnt_id_bytes})
def directory_get(self, dir_id):
return {"id": dir_id, "content": self.directory_ls(dir_id)}
def directory_ls(self, dir_id):
cnt_id_bytes = hash_to_bytes(dir_id)
dir_content = map(
converters.from_directory_entry, self.storage.directory_ls(cnt_id_bytes)
)
return list(dir_content)
def release_get(self, rel_id: str) -> Optional[Dict[str, Any]]:
rel_id_bytes = hash_to_bytes(rel_id)
rel_data = self.storage.release_get([rel_id_bytes])[0]
return converters.from_release(rel_data) if rel_data else None
def revision_get(self, rev_id: str) -> Optional[Dict[str, Any]]:
rev_id_bytes = hash_to_bytes(rev_id)
rev_data = self.storage.revision_get([rev_id_bytes])[0]
return converters.from_revision(rev_data) if rev_data else None
def revision_log(self, rev_id, limit=None):
rev_id_bytes = hash_to_bytes(rev_id)
return list(
map(
converters.from_revision,
self.storage.revision_log([rev_id_bytes], limit=limit),
)
)
def snapshot_get_latest(self, origin_url):
snp = snapshot_get_latest(self.storage, origin_url)
return converters.from_snapshot(snp.to_dict())
def origin_get(self, origin_urls):
origins = self.storage.origin_get(origin_urls)
return [converters.from_origin(o.to_dict()) for o in origins]
def origin_visit_get(self, origin_url):
next_page_token = None
visits = []
while True:
visit_page = self.storage.origin_visit_get(
origin_url, page_token=next_page_token
)
next_page_token = visit_page.next_page_token
for visit in visit_page.results:
visit_status = self.storage.origin_visit_status_get_latest(
origin_url, visit.visit
)
visits.append(
converters.from_origin_visit(
{**visit_status.to_dict(), "type": visit.type}
)
)
if not next_page_token:
break
return visits
def origin_visit_get_by(self, origin_url: str, visit_id: int) -> OriginVisitInfo:
visit = self.storage.origin_visit_get_by(origin_url, visit_id)
assert visit is not None
visit_status = self.storage.origin_visit_status_get_latest(origin_url, visit_id)
assert visit_status is not None
return converters.from_origin_visit(
{**visit_status.to_dict(), "type": visit.type}
)
def origin_visit_status_get_latest(
self,
origin_url,
type: Optional[str] = None,
allowed_statuses: Optional[List[str]] = None,
require_snapshot: bool = False,
):
visit_status = origin_get_latest_visit_status(
self.storage,
origin_url,
type=type,
allowed_statuses=allowed_statuses,
require_snapshot=require_snapshot,
)
return (
converters.from_origin_visit(visit_status.to_dict())
if visit_status
else None
)
def snapshot_get(self, snapshot_id):
snp = snapshot_get_all_branches(self.storage, hash_to_bytes(snapshot_id))
return converters.from_snapshot(snp.to_dict())
def snapshot_get_branches(
self, snapshot_id, branches_from="", branches_count=1000, target_types=None
):
partial_branches = self.storage.snapshot_get_branches(
hash_to_bytes(snapshot_id),
branches_from.encode(),
branches_count,
target_types,
)
return converters.from_partial_branches(partial_branches)
def snapshot_get_head(self, snapshot):
if snapshot["branches"]["HEAD"]["target_type"] == "alias":
target = snapshot["branches"]["HEAD"]["target"]
head = snapshot["branches"][target]["target"]
else:
head = snapshot["branches"]["HEAD"]["target"]
return head
def snapshot_count_branches(self, snapshot_id):
counts = dict.fromkeys(("alias", "release", "revision"), 0)
counts.update(self.storage.snapshot_count_branches(hash_to_bytes(snapshot_id)))
counts.pop(None, None)
return counts
class _IndexerData:
"""
Helper class to manage indexer tests data
It is initialized with a reference to an in-memory indexer storage
containing raw tests data.
It also defines class methods to retrieve those tests data in
a json serializable format in order to ease tests implementation.
"""
def __init__(self, tests_data):
self.idx_storage = tests_data["idx_storage"]
self.mimetype_indexer = tests_data["mimetype_indexer"]
self.license_indexer = tests_data["license_indexer"]
def content_add_mimetype(self, cnt_id):
self.mimetype_indexer.run([hash_to_bytes(cnt_id)])
def content_get_mimetype(self, cnt_id):
mimetype = self.idx_storage.content_mimetype_get([hash_to_bytes(cnt_id)])[
0
].to_dict()
return converters.from_filetype(mimetype)
def content_add_license(self, cnt_id):
self.license_indexer.run([hash_to_bytes(cnt_id)])
def content_get_license(self, cnt_id):
cnt_id_bytes = hash_to_bytes(cnt_id)
licenses = self.idx_storage.content_fossology_license_get([cnt_id_bytes])
for license in licenses:
yield converters.from_swh(license.to_dict(), hashess={"id"})
@pytest.fixture
def keycloak_oidc(keycloak_oidc, mocker):
keycloak_config = get_config()["keycloak"]
keycloak_oidc.server_url = keycloak_config["server_url"]
keycloak_oidc.realm_name = keycloak_config["realm_name"]
keycloak_oidc.client_id = OIDC_SWH_WEB_CLIENT_ID
keycloak_oidc_client = mocker.patch("swh.web.auth.views.keycloak_oidc_client")
keycloak_oidc_client.return_value = keycloak_oidc
return keycloak_oidc
@pytest.fixture
def subtest(request):
"""A hack to explicitly set up and tear down fixtures.
This fixture allows you to set up and tear down fixtures within the test
function itself. This is useful (necessary!) for using Hypothesis inside
pytest, as hypothesis will call the test function multiple times, without
setting up or tearing down fixture state as it is normally the case.
Copied from the pytest-subtesthack project, public domain license
(https://github.com/untitaker/pytest-subtesthack).
"""
parent_test = request.node
def inner(func):
if hasattr(Function, "from_parent"):
item = Function.from_parent(
parent_test,
name=request.function.__name__ + "[]",
originalname=request.function.__name__,
callobj=func,
)
else:
item = Function(
name=request.function.__name__ + "[]", parent=parent_test, callobj=func
)
nextitem = parent_test # prevents pytest from tearing down module fixtures
item.ihook.pytest_runtest_setup(item=item)
try:
item.ihook.pytest_runtest_call(item=item)
finally:
item.ihook.pytest_runtest_teardown(item=item, nextitem=nextitem)
return inner
@pytest.fixture
def swh_scheduler(swh_scheduler):
config = get_config()
scheduler = config["scheduler"]
config["scheduler"] = swh_scheduler
# create load-git and load-hg task types
for task_type in TASK_TYPES.values():
# see https://forge.softwareheritage.org/rDSCHc46ffadf7adf24c7eb3ffce062e8ade3818c79cc # noqa
task_type["type"] = task_type["type"].replace("load-test-", "load-", 1)
swh_scheduler.create_task_type(task_type)
# create load-svn task type
swh_scheduler.create_task_type(
{
"type": "load-svn",
"description": "Update a Subversion repository",
"backend_name": "swh.loader.svn.tasks.DumpMountAndLoadSvnRepository",
"default_interval": timedelta(days=64),
"min_interval": timedelta(hours=12),
"max_interval": timedelta(days=64),
"backoff_factor": 2,
"max_queue_length": None,
"num_retries": 7,
"retry_delay": timedelta(hours=2),
}
)
# create load-cvs task type
swh_scheduler.create_task_type(
{
"type": "load-cvs",
"description": "Update a CVS repository",
"backend_name": "swh.loader.cvs.tasks.DumpMountAndLoadSvnRepository",
"default_interval": timedelta(days=64),
"min_interval": timedelta(hours=12),
"max_interval": timedelta(days=64),
"backoff_factor": 2,
"max_queue_length": None,
"num_retries": 7,
"retry_delay": timedelta(hours=2),
}
)
# create load-bzr task type
swh_scheduler.create_task_type(
{
"type": "load-bzr",
"description": "Update a Bazaar repository",
"backend_name": "swh.loader.bzr.tasks.LoadBazaar",
"default_interval": timedelta(days=64),
"min_interval": timedelta(hours=12),
"max_interval": timedelta(days=64),
"backoff_factor": 2,
"max_queue_length": None,
"num_retries": 7,
"retry_delay": timedelta(hours=2),
}
)
# add method to add load-archive-files task type during tests
def add_load_archive_task_type():
swh_scheduler.create_task_type(
{
"type": "load-archive-files",
"description": "Load tarballs",
"backend_name": "swh.loader.package.archive.tasks.LoadArchive",
"default_interval": timedelta(days=64),
"min_interval": timedelta(hours=12),
"max_interval": timedelta(days=64),
"backoff_factor": 2,
"max_queue_length": None,
"num_retries": 7,
"retry_delay": timedelta(hours=2),
}
)
swh_scheduler.add_load_archive_task_type = add_load_archive_task_type
yield swh_scheduler
config["scheduler"] = scheduler
get_scheduler_load_task_types.cache_clear()
@pytest.fixture(scope="session")
def django_db_setup(request, django_db_blocker, postgresql_proc):
from django.conf import settings
settings.DATABASES["default"].update(
{
("ENGINE", "django.db.backends.postgresql"),
("NAME", get_config()["test_db"]["name"]),
("USER", postgresql_proc.user),
("HOST", postgresql_proc.host),
("PORT", postgresql_proc.port),
}
)
with django_db_blocker.unblock():
setup_databases(
verbosity=request.config.option.verbose, interactive=False, keepdb=False
)
@pytest.fixture
def staff_user():
return User.objects.create_user(username="admin", password="", is_staff=True)
@pytest.fixture
def regular_user():
return User.objects.create_user(username="johndoe", password="")
@pytest.fixture
def regular_user2():
return User.objects.create_user(username="janedoe", password="")
@pytest.fixture
def add_forge_moderator():
moderator = User.objects.create_user(username="add-forge moderator", password="")
moderator.user_permissions.add(
create_django_permission(ADD_FORGE_MODERATOR_PERMISSION)
)
return moderator
@pytest.fixture
def mailmap_admin():
mailmap_admin = User.objects.create_user(username="mailmap-admin", password="")
mailmap_admin.user_permissions.add(
create_django_permission(MAILMAP_ADMIN_PERMISSION)
)
return mailmap_admin
@pytest.fixture
def mailmap_user():
mailmap_user = User.objects.create_user(username="mailmap-user", password="")
mailmap_user.user_permissions.add(create_django_permission(MAILMAP_PERMISSION))
return mailmap_user
def reload_urlconf():
from django.conf import settings
clear_url_caches()
urlconf = settings.ROOT_URLCONF
if urlconf in sys.modules:
reload(sys.modules[urlconf])
else:
import_module(urlconf)
class SwhSettingsWrapper(SettingsWrapper):
def __setattr__(self, attr: str, value) -> None:
super().__setattr__(attr, value)
reload_urlconf()
def finalize(self) -> None:
super().finalize()
reload_urlconf()
@pytest.fixture
def django_settings():
"""Override pytest-django settings fixture in order to reload URLs
when modifying settings in test and after test execution as most of
them depend on installed django apps in swh-web.
"""
settings = SwhSettingsWrapper()
yield settings
settings.finalize()
diff --git a/swh/web/auth/management/__init__.py b/swh/web/tests/mailmap/__init__.py
similarity index 100%
rename from swh/web/auth/management/__init__.py
rename to swh/web/tests/mailmap/__init__.py
diff --git a/swh/web/tests/mailmap/test_app.py b/swh/web/tests/mailmap/test_app.py
new file mode 100644
index 00000000..c31a2c0d
--- /dev/null
+++ b/swh/web/tests/mailmap/test_app.py
@@ -0,0 +1,32 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import pytest
+
+from django.urls import get_resolver
+
+from swh.web.common.utils import reverse
+from swh.web.mailmap.urls import urlpatterns
+from swh.web.tests.django_asserts import assert_not_contains
+from swh.web.tests.utils import check_html_get_response
+
+
+@pytest.mark.django_db
+def test_mailmap_deactivate(client, mailmap_admin, django_settings):
+ """Check mailmap feature is deactivated when the swh.web.mailmap django
+ application is not in installed apps."""
+
+ django_settings.SWH_DJANGO_APPS = [
+ app for app in django_settings.SWH_DJANGO_APPS if app != "swh.web.mailmap"
+ ]
+
+ url = reverse("swh-web-homepage")
+ client.force_login(mailmap_admin)
+ resp = check_html_get_response(client, url, status_code=200)
+ assert_not_contains(resp, "swh-mailmap-admin-item")
+
+ mailmap_view_names = set(urlpattern.name for urlpattern in urlpatterns)
+ all_view_names = set(get_resolver().reverse_dict.keys())
+ assert mailmap_view_names & all_view_names == set()
diff --git a/swh/web/tests/auth/test_mailmap.py b/swh/web/tests/mailmap/test_mailmap.py
similarity index 99%
rename from swh/web/tests/auth/test_mailmap.py
rename to swh/web/tests/mailmap/test_mailmap.py
index a948ece2..4a9c2029 100644
--- a/swh/web/tests/auth/test_mailmap.py
+++ b/swh/web/tests/mailmap/test_mailmap.py
@@ -1,600 +1,600 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from io import StringIO
import json
from typing import Dict
from psycopg2.extras import execute_values
import pytest
from django.core.management import call_command
from django.db import transaction
from swh.model.model import Person
-from swh.web.auth.models import UserMailmap, UserMailmapEvent
from swh.web.common.utils import reverse
+from swh.web.mailmap.models import UserMailmap, UserMailmapEvent
from swh.web.tests.utils import (
check_api_post_response,
check_http_get_response,
check_http_post_response,
)
@pytest.mark.django_db(transaction=True)
@pytest.mark.parametrize("view_name", ["profile-mailmap-add", "profile-mailmap-update"])
def test_mailmap_endpoints_anonymous_user(api_client, view_name):
url = reverse(view_name)
check_api_post_response(api_client, url, status_code=403)
@pytest.mark.django_db(transaction=True)
def test_mailmap_endpoints_user_with_permission(
api_client, mailmap_user, mailmap_admin
):
for user, name in ((mailmap_user, "bar"), (mailmap_admin, "baz")):
UserMailmapEvent.objects.all().delete()
api_client.force_login(user)
request_data = {"from_email": f"{name}@example.org", "display_name": name}
for view_name in ("profile-mailmap-add", "profile-mailmap-update"):
url = reverse(view_name)
check_api_post_response(
api_client,
url,
data=request_data,
status_code=200,
)
# FIXME: use check_api_get_responses; currently this crashes without
# content_type="application/json"
resp = check_http_get_response(
api_client,
reverse("profile-mailmap-list"),
status_code=200,
content_type="application/json",
).data
assert len(resp) == 1
assert resp[0]["from_email"] == f"{name}@example.org"
assert resp[0]["display_name"] == name
events = UserMailmapEvent.objects.order_by("timestamp").all()
assert len(events) == 2
assert events[0].request_type == "add"
assert json.loads(events[0].request) == request_data
assert events[1].request_type == "update"
assert json.loads(events[1].request) == request_data
@pytest.mark.django_db(transaction=True)
def test_mailmap_add_duplicate(api_client, mailmap_user, mailmap_admin):
for user, name in ((mailmap_user, "foo"), (mailmap_admin, "bar")):
api_client.force_login(user)
check_api_post_response(
api_client,
reverse("profile-mailmap-add"),
data={"from_email": f"{name}@example.org", "display_name": name},
status_code=200,
)
check_api_post_response(
api_client,
reverse("profile-mailmap-add"),
data={"from_email": f"{name}@example.org", "display_name": name},
status_code=400,
)
@pytest.mark.django_db(transaction=True)
def test_mailmap_add_full(api_client, mailmap_user, mailmap_admin):
for user, name in ((mailmap_user, "foo"), (mailmap_admin, "bar")):
api_client.force_login(user)
UserMailmapEvent.objects.all().delete()
request_data = {
"from_email": f"{name}@example.org",
"from_email_verified": True,
"from_email_verification_request_date": "2021-02-07T14:04:15Z",
"display_name": name,
"display_name_activated": True,
"to_email": "baz@example.org",
"to_email_verified": True,
"to_email_verification_request_date": "2021-02-07T15:54:59Z",
}
check_api_post_response(
api_client,
reverse("profile-mailmap-add"),
data=request_data,
status_code=200,
)
resp = check_http_get_response(
api_client,
reverse("profile-mailmap-list"),
status_code=200,
content_type="application/json",
).data
assert len(resp) == 1
assert resp[0].items() >= request_data.items()
events = UserMailmapEvent.objects.all()
assert len(events) == 1
assert events[0].request_type == "add"
assert json.loads(events[0].request) == request_data
assert events[0].successful
@pytest.mark.django_db(transaction=True)
def test_mailmap_endpoints_error_response(api_client, mailmap_user, mailmap_admin):
for user in (mailmap_user, mailmap_admin):
api_client.force_login(user)
UserMailmapEvent.objects.all().delete()
url = reverse("profile-mailmap-add")
resp = check_api_post_response(api_client, url, status_code=400)
assert b"from_email" in resp.content
url = reverse("profile-mailmap-update")
resp = check_api_post_response(api_client, url, status_code=400)
assert b"from_email" in resp.content
events = UserMailmapEvent.objects.order_by("timestamp").all()
assert len(events) == 2
assert events[0].request_type == "add"
assert json.loads(events[0].request) == {}
assert not events[0].successful
assert events[1].request_type == "update"
assert json.loads(events[1].request) == {}
assert not events[1].successful
@pytest.mark.django_db(transaction=True)
def test_mailmap_update(api_client, mailmap_user, mailmap_admin):
for user, name in ((mailmap_user, "foo"), (mailmap_admin, "bar")):
api_client.force_login(user)
UserMailmapEvent.objects.all().delete()
before_add = datetime.datetime.now(tz=datetime.timezone.utc)
check_api_post_response(
api_client,
reverse("profile-mailmap-add"),
data={
"from_email": f"{name}1@example.org",
"display_name": "Display Name 1",
},
status_code=200,
)
check_api_post_response(
api_client,
reverse("profile-mailmap-add"),
data={
"from_email": f"{name}2@example.org",
"display_name": "Display Name 2",
},
status_code=200,
)
after_add = datetime.datetime.now(tz=datetime.timezone.utc)
user_id = None if user == mailmap_admin else str(user.id)
mailmaps = list(
UserMailmap.objects.filter(user_id=user_id).order_by("from_email").all()
)
assert len(mailmaps) == 2, mailmaps
assert mailmaps[0].from_email == f"{name}1@example.org", mailmaps
assert mailmaps[0].display_name == "Display Name 1", mailmaps
assert before_add <= mailmaps[0].last_update_date <= after_add
assert mailmaps[1].from_email == f"{name}2@example.org", mailmaps
assert mailmaps[1].display_name == "Display Name 2", mailmaps
assert before_add <= mailmaps[0].last_update_date <= after_add
before_update = datetime.datetime.now(tz=datetime.timezone.utc)
check_api_post_response(
api_client,
reverse("profile-mailmap-update"),
data={
"from_email": f"{name}1@example.org",
"display_name": "Display Name 1b",
},
status_code=200,
)
after_update = datetime.datetime.now(tz=datetime.timezone.utc)
mailmaps = list(
UserMailmap.objects.filter(user_id=user_id).order_by("from_email").all()
)
assert len(mailmaps) == 2, mailmaps
assert mailmaps[0].from_email == f"{name}1@example.org", mailmaps
assert mailmaps[0].display_name == "Display Name 1b", mailmaps
assert before_update <= mailmaps[0].last_update_date <= after_update
assert mailmaps[1].from_email == f"{name}2@example.org", mailmaps
assert mailmaps[1].display_name == "Display Name 2", mailmaps
assert before_add <= mailmaps[1].last_update_date <= after_add
events = UserMailmapEvent.objects.order_by("timestamp").all()
assert len(events) == 3
assert events[0].request_type == "add"
assert events[1].request_type == "add"
assert events[2].request_type == "update"
@pytest.mark.django_db(transaction=True)
def test_mailmap_update_from_email_not_found(api_client, mailmap_admin):
api_client.force_login(mailmap_admin)
check_api_post_response(
api_client,
reverse("profile-mailmap-update"),
data={
"from_email": "invalid@example.org",
"display_name": "Display Name",
},
status_code=404,
)
NB_MAILMAPS = 20
MM_PER_PAGE = 10
def _create_mailmaps(client):
mailmaps = []
for i in range(NB_MAILMAPS):
resp = check_http_post_response(
client,
reverse("profile-mailmap-add"),
data={
"from_email": f"user{i:02d}@example.org",
"display_name": f"User {i:02d}",
},
status_code=200,
)
mailmaps.append(json.loads(resp.content))
return mailmaps
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_mailmap_list_datatables_no_parameters(client, mailmap_admin):
client.force_login(mailmap_admin)
mailmaps = _create_mailmaps(client)
url = reverse("profile-mailmap-list-datatables")
resp = check_http_get_response(client, url, status_code=200)
mailmap_data = json.loads(resp.content)
assert mailmap_data["recordsTotal"] == NB_MAILMAPS
assert mailmap_data["recordsFiltered"] == NB_MAILMAPS
# mailmaps sorted by ascending from_email by default
for i in range(10):
assert mailmap_data["data"][i]["from_email"] == mailmaps[i]["from_email"]
@pytest.mark.django_db(transaction=True, reset_sequences=True)
@pytest.mark.parametrize("sort_direction", ["asc", "desc"])
def test_mailmap_list_datatables_ordering(client, mailmap_admin, sort_direction):
client.force_login(mailmap_admin)
mailmaps = _create_mailmaps(client)
mailmaps_sorted = list(sorted(mailmaps, key=lambda d: d["display_name"]))
all_display_names = [mm["display_name"] for mm in mailmaps_sorted]
if sort_direction == "desc":
all_display_names = list(reversed(all_display_names))
for i in range(NB_MAILMAPS // MM_PER_PAGE):
url = reverse(
"profile-mailmap-list-datatables",
query_params={
"draw": i,
"length": MM_PER_PAGE,
"start": i * MM_PER_PAGE,
"order[0][column]": 2,
"order[0][dir]": sort_direction,
"columns[2][name]": "display_name",
},
)
resp = check_http_get_response(client, url, status_code=200)
data = json.loads(resp.content)
assert data["draw"] == i
assert data["recordsFiltered"] == NB_MAILMAPS
assert data["recordsTotal"] == NB_MAILMAPS
assert len(data["data"]) == MM_PER_PAGE
display_names = [mm["display_name"] for mm in data["data"]]
expected_display_names = all_display_names[
i * MM_PER_PAGE : (i + 1) * MM_PER_PAGE
]
assert display_names == expected_display_names
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_mailmap_list_datatables_search(client, mailmap_admin):
client.force_login(mailmap_admin)
_create_mailmaps(client)
search_value = "user1"
url = reverse(
"profile-mailmap-list-datatables",
query_params={
"draw": 1,
"length": MM_PER_PAGE,
"start": 0,
"search[value]": search_value,
},
)
resp = check_http_get_response(client, url, status_code=200)
data = json.loads(resp.content)
assert data["draw"] == 1
assert data["recordsFiltered"] == MM_PER_PAGE
assert data["recordsTotal"] == NB_MAILMAPS
assert len(data["data"]) == MM_PER_PAGE
for mailmap in data["data"]:
assert search_value in mailmap["from_email"]
def populate_mailmap():
for (verified, activated) in (
(False, False),
(False, True),
(True, False),
(True, True),
):
verified_str = "V" if verified else ""
activated_str = "A" if activated else ""
UserMailmap.objects.create(
from_email=f"from_email{verified_str}{activated_str}@example.com",
display_name=f"Display Name {verified_str} {activated_str}".strip(),
from_email_verified=verified,
display_name_activated=activated,
)
def call_sync_mailmaps(*args) -> str:
out = StringIO()
err = StringIO()
call_command("sync_mailmaps", *args, stdout=out, stderr=err)
out.seek(0)
err.seek(0)
assert err.read() == ""
return out.read()
MAILMAP_KNOWN_FULLNAMES = (
"Original Name ",
"Original Name V ",
"Original Name A ",
"Original Name V A ",
"Original Name V A 2 ",
"Original Name V A 3 ",
)
MAILMAP_KNOWN_PEOPLE = tuple(
Person.from_fullname(f.encode()) for f in MAILMAP_KNOWN_FULLNAMES
)
def init_stub_storage_db(postgresql):
cur = postgresql.cursor()
cur.execute(
"""
CREATE TABLE person (
fullname bytea PRIMARY KEY,
name bytea,
email bytea,
displayname bytea
)
"""
)
execute_values(
cur,
"INSERT INTO person (fullname, name, email) VALUES %s",
(p.to_dict() for p in MAILMAP_KNOWN_PEOPLE),
template="(%(fullname)s, %(name)s, %(email)s)",
)
cur.execute("CREATE INDEX ON person (email)")
postgresql.commit()
cur.close()
return postgresql.dsn
def get_displaynames(postgresql) -> Dict[str, str]:
with postgresql.cursor() as cur:
cur.execute(
"SELECT fullname, displayname FROM person WHERE displayname IS NOT NULL"
)
return {bytes(f).decode("utf-8"): bytes(d).decode("utf-8") for (f, d) in cur}
@pytest.mark.django_db(transaction=True)
def test_sync_mailmaps_dry_run(postgresql):
with transaction.atomic():
populate_mailmap()
dsn = init_stub_storage_db(postgresql)
out = call_sync_mailmaps(dsn)
assert "(dry run)" in out
assert "Synced 1 mailmaps to swh.storage database" in out
assert get_displaynames(postgresql) == {}
assert (
UserMailmap.objects.filter(
from_email_verified=True,
display_name_activated=True,
mailmap_last_processing_date__isnull=False,
).count()
== 0
)
@pytest.mark.django_db(transaction=True)
def test_sync_mailmaps_perform(postgresql):
with transaction.atomic():
populate_mailmap()
dsn = init_stub_storage_db(postgresql)
out = call_sync_mailmaps("--perform", dsn)
assert "(dry run)" not in out
assert "Synced 1 mailmaps to swh.storage database" in out
expected_displaynames = {
"Original Name V A ": "Display Name V A",
"Original Name V A 2 ": "Display Name V A",
"Original Name V A 3 ": "Display Name V A",
}
assert get_displaynames(postgresql) == expected_displaynames
assert (
UserMailmap.objects.filter(
from_email_verified=True,
display_name_activated=True,
mailmap_last_processing_date__isnull=False,
).count()
== 1
)
@pytest.mark.django_db(transaction=True)
def test_sync_mailmaps_with_to_email(postgresql):
with transaction.atomic():
populate_mailmap()
dsn = init_stub_storage_db(postgresql)
call_sync_mailmaps("--perform", dsn)
expected_displaynames = {
"Original Name V A ": "Display Name V A",
"Original Name V A 2 ": "Display Name V A",
"Original Name V A 3 ": "Display Name V A",
}
assert get_displaynames(postgresql) == expected_displaynames
# Add a non-valid to_email
with transaction.atomic():
for mailmap in UserMailmap.objects.filter(
from_email_verified=True, display_name_activated=True
):
mailmap.to_email = "to_email@example.com"
mailmap.save()
call_sync_mailmaps("--perform", dsn)
assert get_displaynames(postgresql) == expected_displaynames
# Verify the relevant to_email
with transaction.atomic():
for mailmap in UserMailmap.objects.filter(
from_email_verified=True, display_name_activated=True
):
mailmap.to_email_verified = True
mailmap.save()
call_sync_mailmaps("--perform", dsn)
new_displayname = "Display Name V A "
expected_displaynames = {
"Original Name V A ": new_displayname,
"Original Name V A 2 ": new_displayname,
"Original Name V A 3 ": new_displayname,
}
assert get_displaynames(postgresql) == expected_displaynames
@pytest.mark.django_db(transaction=True)
def test_sync_mailmaps_disable(postgresql):
"""Check that disabling a mailmap only happens once"""
with transaction.atomic():
populate_mailmap()
dsn = init_stub_storage_db(postgresql)
# Do the initial mailmap sync
call_sync_mailmaps("--perform", dsn)
assert len(get_displaynames(postgresql)) == 3
updated = 0
# Disable a display name
with transaction.atomic():
# Cannot use update() because `last_update_date` would not be updated
for mailmap in UserMailmap.objects.filter(
from_email_verified=True, display_name_activated=True
):
mailmap.display_name_activated = False
mailmap.save()
updated += 1
assert updated == 1
# Sync mailmaps again
out = call_sync_mailmaps("--perform", dsn)
assert "1 mailmaps to disable" in out
assert get_displaynames(postgresql) == {}
# Update a displayname by hand
with postgresql.cursor() as cur:
cur.execute(
"UPDATE person SET displayname='Manual Display Name' "
"WHERE fullname='Original Name V A '"
)
expected_displaynames = {
"Original Name V A ": "Manual Display Name"
}
assert get_displaynames(postgresql) == expected_displaynames
# Sync mailmaps one last time. No mailmaps should be disabled
out = call_sync_mailmaps("--perform", dsn)
assert "0 mailmaps to disable" in out
assert get_displaynames(postgresql) == expected_displaynames
diff --git a/swh/web/tests/mailmap/test_migrations.py b/swh/web/tests/mailmap/test_migrations.py
new file mode 100644
index 00000000..cc956609
--- /dev/null
+++ b/swh/web/tests/mailmap/test_migrations.py
@@ -0,0 +1,15 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+APP_NAME = "swh_web_mailmap"
+
+MIGRATION_0001 = "0001_initial"
+
+
+def test_mailmap_django_app(migrator):
+ state = migrator.apply_tested_migration((APP_NAME, MIGRATION_0001))
+ UserMailmap = state.apps.get_model(APP_NAME, "UserMailmap")
+ assert UserMailmap